package com.open.cloud.leaf.segment;

import com.open.cloud.leaf.IDGen;
import com.open.cloud.leaf.core.common.Result;
import com.open.cloud.leaf.core.common.Status;
import com.open.cloud.leaf.segment.dao.IDAllocDao;
import com.open.cloud.leaf.segment.model.LeafAlloc;
import com.open.cloud.leaf.segment.model.Segment;
import com.open.cloud.leaf.segment.model.SegmentBuffer;
import org.perf4j.StopWatch;
import org.perf4j.slf4j.Slf4JStopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

public class SegmentIDGenImpl implements IDGen {
	private static final Logger logger = LoggerFactory
			.getLogger(SegmentIDGenImpl.class);

	/**
	 * IDCache未初始化成功时的异常码
	 */
	private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;
	/**
	 * key不存在时的异常码
	 */
	private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;
	/**
	 * SegmentBuffer中的两个Segment均未从DB中装载时的异常码
	 */
	private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;
	/**
	 * 最大步长不超过100,0000
	 */
	private static final int MAX_STEP = 1000000;
	/**
	 * 一个Segment维持时间为15分钟
	 */
	private static final long SEGMENT_DURATION = 15 * 60 * 1000L;
	private ExecutorService service = new ThreadPoolExecutor(
			5,
			Integer.MAX_VALUE,
			60L,
			TimeUnit.SECONDS,
			new SynchronousQueue<Runnable>(),
			new UpdateThreadFactory());
	private volatile boolean initOK = false;
	private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();
	private IDAllocDao dao;

	public static class UpdateThreadFactory implements ThreadFactory {

		private static int threadInitNumber = 0;

		private static synchronized int nextThreadNum() {
			return threadInitNumber++;
		}

		@Override
		public Thread newThread(Runnable r) {
			return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());
		}
	}

	@Override
	public boolean init() {
		logger.info("Init ...");
		// 确保加载到kv后才初始化成功
		updateCacheFromDb();
		initOK = true;
		updateCacheFromDbAtEveryMinute();
		return initOK;
	}

	private void updateCacheFromDbAtEveryMinute() {
		ScheduledExecutorService service = Executors
				.newSingleThreadScheduledExecutor(new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						Thread t = new Thread(r);
						t.setName("check-idCache-thread");
						t.setDaemon(true);
						return t;
					}
				});
		service.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				updateCacheFromDb();
			}
		}, 60, 60, TimeUnit.SECONDS);
	}

	private void updateCacheFromDb() {
		logger.info("update cache from db");
		StopWatch sw = new Slf4JStopWatch();
		try {
			List<String> dbTags = dao.getAllTags();
			if (dbTags == null || dbTags.isEmpty()) {
				return;
			}
			List<String> cacheTags = new ArrayList<String>(cache.keySet());
			List<String> insertTags = new ArrayList<String>(dbTags);
			List<String> removeTags = new ArrayList<String>(cacheTags);
			//db中新加的tags灌进cache
			insertTags.removeAll(cacheTags);
			for (String tag : insertTags) {
				SegmentBuffer buffer = new SegmentBuffer();
				buffer.setKey(tag);
				Segment segment = buffer.getCurrent();
				segment.setValue(new AtomicLong(0));
				segment.setMax(0);
				segment.setStep(0);
				cache.put(tag, buffer);
				logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
			}
			//cache中已失效的tags从cache删除
			removeTags.removeAll(dbTags);
			for (String tag : removeTags) {
				cache.remove(tag);
				logger.info("Remove tag {} from IdCache", tag);
			}
		} catch (Exception e) {
			logger.warn("update cache from db exception", e);
		} finally {
			sw.stop("updateCacheFromDb");
		}
	}

    private void insertDb() {
        logger.info("update cache from db");
        StopWatch sw = new Slf4JStopWatch();
        try {
            List<String> dbTags = dao.getAllTags();
            if (dbTags == null || dbTags.isEmpty()) {
                return;
            }
            List<String> cacheTags = new ArrayList<String>(cache.keySet());
            List<String> insertTags = new ArrayList<String>(dbTags);
            List<String> removeTags = new ArrayList<String>(cacheTags);
            //db中新加的tags灌进cache
            insertTags.removeAll(cacheTags);
            for (String tag : insertTags) {
                SegmentBuffer buffer = new SegmentBuffer();
                buffer.setKey(tag);
                Segment segment = buffer.getCurrent();
                segment.setValue(new AtomicLong(0));
                segment.setMax(0);
                segment.setStep(0);
                cache.put(tag, buffer);
                logger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);
            }
            //cache中已失效的tags从cache删除
            removeTags.removeAll(dbTags);
            for (String tag : removeTags) {
                cache.remove(tag);
                logger.info("Remove tag {} from IdCache", tag);
            }
        } catch (Exception e) {
            logger.warn("update cache from db exception", e);
        } finally {
            sw.stop("updateCacheFromDb");
        }
    }

	@Override
	public Result get(final String key) {
		if (!initOK) {
			return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
		}
		if (!cache.containsKey(key)) {
            initOK = false;
            updateCacheFromDb();
		}
		SegmentBuffer buffer = cache.get(key);
		if (!buffer.isInitOk()) {
			synchronized (buffer) {
				if (!buffer.isInitOk()) {
					try {
						updateSegmentFromDb(key, buffer.getCurrent());
						logger.info("Init buffer. Update leafkey {} {} from db", key,
								buffer.getCurrent());
						buffer.setInitOk(true);
					} catch (Exception e) {
						logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
					}
				}
			}
		}
		return getIdFromSegmentBuffer(cache.get(key));
	}

	public void updateSegmentFromDb(String key, Segment segment) {
		StopWatch sw = new Slf4JStopWatch();
		SegmentBuffer buffer = segment.getBuffer();
		LeafAlloc leafAlloc;
		if (!buffer.isInitOk()) {
			leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
			buffer.setStep(leafAlloc.getStep());
			buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
		} else if (buffer.getUpdateTimestamp() == 0) {
			leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
			buffer.setUpdateTimestamp(System.currentTimeMillis());
			buffer.setStep(leafAlloc.getStep());
			buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step
		} else {
			long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
			int nextStep = buffer.getStep();
			if (duration < SEGMENT_DURATION) {
				if (nextStep * 2 > MAX_STEP) {
					//do nothing
				} else {
					nextStep = nextStep * 2;
				}
			} else if (duration < SEGMENT_DURATION * 2) {
				//do nothing with nextStep
			} else {
				nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
			}
			logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key,
					buffer.getStep(), String.format("%.2f", ((double) duration / (1000 * 60))),
					nextStep);
			LeafAlloc temp = new LeafAlloc();
			temp.setBizTag(key);
			temp.setStep(nextStep);
			leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
			buffer.setUpdateTimestamp(System.currentTimeMillis());
			buffer.setStep(nextStep);
			buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step
		}
		// must set value before set max
		long value = leafAlloc.getMaxId() - buffer.getStep();
		segment.getValue().set(value);
		segment.setMax(leafAlloc.getMaxId());
		segment.setStep(buffer.getStep());
		sw.stop("updateSegmentFromDb", key + " " + segment);
	}

	public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
		while (true) {
			buffer.rLock().lock();
			try {
				final Segment segment = buffer.getCurrent();
				if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep())
						&& buffer.getThreadRunning().compareAndSet(false, true)) {
					service.execute(new Runnable() {
						@Override
						public void run() {
							Segment next = buffer.getSegments()[buffer.nextPos()];
							boolean updateOk = false;
							try {
								updateSegmentFromDb(buffer.getKey(), next);
								updateOk = true;
								logger.info("update segment {} from db {}", buffer.getKey(), next);
							} catch (Exception e) {
								logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
							} finally {
								if (updateOk) {
									buffer.wLock().lock();
									buffer.setNextReady(true);
									buffer.getThreadRunning().set(false);
									buffer.wLock().unlock();
								} else {
									buffer.getThreadRunning().set(false);
								}
							}
						}
					});
				}
				long value = segment.getValue().getAndIncrement();
				if (value < segment.getMax()) {
					return new Result(value, Status.SUCCESS);
				}
			} finally {
				buffer.rLock().unlock();
			}
			waitAndSleep(buffer);
			buffer.wLock().lock();
			try {
				final Segment segment = buffer.getCurrent();
				long value = segment.getValue().getAndIncrement();
				if (value < segment.getMax()) {
					return new Result(value, Status.SUCCESS);
				}
				if (buffer.isNextReady()) {
					buffer.switchPos();
					buffer.setNextReady(false);
				} else {
					logger.error("Both two segments in {} are not ready!", buffer);
					return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
				}
			} finally {
				buffer.wLock().unlock();
			}
		}
	}

	private void waitAndSleep(SegmentBuffer buffer) {
		int roll = 0;
		while (buffer.getThreadRunning().get()) {
			roll += 1;
			if (roll > 10000) {
				try {
					TimeUnit.MILLISECONDS.sleep(10);
					break;
				} catch (InterruptedException e) {
					logger.warn("Thread {} Interrupted", Thread.currentThread().getName());
					break;
				}
			}
		}
	}

	public List<LeafAlloc> getAllLeafAllocs() {
		return dao.getAllLeafAllocs();
	}

	public Map<String, SegmentBuffer> getCache() {
		return cache;
	}

	public IDAllocDao getDao() {
		return dao;
	}

	public void setDao(IDAllocDao dao) {
		this.dao = dao;
	}
}
